feat(adapter/nemo): cleanup checkpoint container on_train_end#94
feat(adapter/nemo): cleanup checkpoint container on_train_end#94Leahlijuan wants to merge 9 commits intomainfrom
Conversation
| return | ||
|
|
||
| for cb in mlf_callbacks: | ||
| cb.replication_manager = replication_manager |
There was a problem hiding this comment.
should we just initialize this earlier to pass it to the callback? Or do you think this is better? It might complicate the recipes a little which isn't great, but would be more straightforward
There was a problem hiding this comment.
keep it for now so that we don't need to modify recipes
Python Code Coverage Summary
Minimum allowed line rate is |
C++ Code Coverage Summary
Minimum allowed line rate is |
| namespace fs = std::filesystem; | ||
|
|
||
| namespace { | ||
| // We use a fork/exec approach calling 'rm -rf' here instead of |
There was a problem hiding this comment.
interesting, is this the only way to get around that? do we know why there was a seg fault?
| if os.path.isdir(container_id): | ||
| # Use shutil.rmtree for recursive deletion. | ||
| shutil.rmtree(container_id) | ||
| shutil.rmtree(container_id, onerror=_onerror) |
There was a problem hiding this comment.
a note: this delete_container function is synchronous, and doesnt use the async C++ impl for delete dir, so we should be careful of when we use each. We might want to make this call that one, and allow for blocking, for consistency
There was a problem hiding this comment.
As we delete_container before it's fully finished (save + replication), so the transfer service might make changes to the dir at the same time (in the receiver, we save to a tmp file first and then rename it), so there could be file not found error.
| int peer_port_; | ||
| size_t max_size_; | ||
| std::queue<int> available_connections_; // Guarded by mtx_. | ||
| std::queue<int> available_connections_; // Guarded by mtx_. |
There was a problem hiding this comment.
can you explain the rationale for using a queue here and unordered set below? specifically why FIFO order is relevant for available_connections.
also are these two collections mutually exclusive?
There was a problem hiding this comment.
yes, they should be be mutually exclusive. For the queue usage for available_connections_, ideally no difference if we using a queue or a stack, just a way to track them and get one to use quickly.
adding active_connections_ as we want to destroy all the alive connection during shutdown
| } | ||
| if (reuse) { | ||
| if (available_connections_.size() < max_size_) { | ||
| LOG(INFO) << "ConnectionPool::ReleaseConnection: reuse connection"; |
There was a problem hiding this comment.
these INFO logs here and below are debug logs, I would avoid using INFO for them so we dont pollute the logs. Maybe use VLOG(3).
Can follow this guideline:
| Level | Usage Case | Frequency |
|---|---|---|
VLOG(1) |
High-level flow: Major state changes or important function entries (e.g., "Pool initialized", "Connection created"). | Low |
VLOG(2) |
Detailed events: Per-request or per-connection logic (e.g., "Connection X added to active set"). | Medium |
VLOG(3) |
Deep tracing: Logic branches inside loops or complex conditionals. | High |
VLOG(4)+ |
Extremely noisy: Byte-level data, heartbeats, or internal mutex locking/unlocking details. | Very High |
This change adds on_train_end in the MLFlashpointCheckpointCallback, which will shutdown replication manager once the ML-Flashpoint checkpoint save done, and remove the mlf checkpoint container.